package hotnet.processor;

import hotnet.future.WriteFuture;
import hotnet.session.IoSession;

import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleIoProcessorPool implements IoProcessor {
	private static final Logger logger = LoggerFactory.getLogger(SimpleIoProcessorPool.class);
	// TODO: private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 1;
	private static final int DEFAULT_POOL_SIZE = 1;
	private final IoProcessor[] pool;
	private final Executor executor;
	private final ProcessorChooser chooser = new ProcessorChooser();
	
	private final Object disposalLock = new Object();
	private volatile boolean disposing;
	private volatile boolean disposed;
	
	public SimpleIoProcessorPool(Class<? extends IoProcessor> processorCls) {
		pool = new IoProcessor[DEFAULT_POOL_SIZE];
		
		executor = Executors.newCachedThreadPool();
		
		((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		
		boolean success = false;
		 
		Constructor<? extends IoProcessor> processorConstructor = null;
		try {
			try {
	            processorConstructor = processorCls.getConstructor(Executor.class);
	            pool[0] = processorConstructor.newInstance(this.executor);
	        } catch(NoSuchMethodException nse) {
	        	throw new IllegalArgumentException("no such method");
	        } catch (Exception e) {
	        	throw new RuntimeException(e);
			}
			
			if (processorConstructor != null) {
				for (int i = 1; i < pool.length; i++) {
	                try {
	                	pool[i] = processorConstructor.newInstance(this.executor);
	                } catch (Exception e) {
	                    // Won't happen because it has been done previousl
	                }
	            }
			}
			
			success = true;
		} finally {
			if (!success) {
				dispose();
			}
		}

	}
	
	@Override
	public boolean isDisposing() {
		return disposing;
	}
	@Override
	public boolean isDisposed() {
		return disposed;
	}
	@Override
	public void dispose() {
		if (disposed)
			return;
		
		synchronized (disposalLock) {
			if (!disposing) {
				disposing = true;
			}
			
			for (IoProcessor processor : pool) {
				if (processor != null) {
					if (processor.isDisposing() || processor.isDisposed()) {
						continue;
					}
					
					try {
                        processor.dispose();
                    } catch (Exception e) {
                    	logger.warn("Failed to dispose the {} IoProcessor.", processor.getClass().getSimpleName(), e);
                    }
				}
			}
			
			((ExecutorService)this.executor).shutdown();
		}
		
		Arrays.fill(pool, null);
		disposed = true;
	}
	@Override
	public void add(IoSession session) {
		chooser.processorChooser(session).add(session);
		
	}
	@Override
	public void remove(IoSession session) {
		chooser.processorChooser(session).remove(session);
		
	}
	@Override
	public void flush(IoSession session) {
		chooser.processorChooser(session).flush(session);
		
	}
	@Override
	public void write(IoSession session, WriteFuture writeRequest) {
		chooser.processorChooser(session).write(session, writeRequest);
		
	}
	
	private class ProcessorChooser {
		public IoProcessor processorChooser(IoSession session) {
			IoProcessor processor;
			
			if (disposed || disposing) {
				throw new IllegalStateException("A disposed processor cannot be accessed.");
			}
			
			processor = pool[Math.abs((int) session.getSessionId()) % pool.length];
			
			return processor;
		}
	}
}
